#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <list.h>
#include <yid.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "volume_proto.h"

#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

extern uint32_t lsv_feature;

int lsv_wal2_chunk_new(uint32_t chunk_id, lsv_wbuf_wal_chunk_t **_chunk) {
        int ret;
        lsv_wbuf_wal_chunk_t *wchunk;

        *_chunk = NULL;

        ret = ymalloc((void **)&wchunk, sizeof(lsv_wbuf_wal_chunk_t));
        if (unlikely(ret))
                YASSERT(0);

        wchunk->chunk_id = chunk_id;
        wchunk->next_page = 0;
        wchunk->max_lsn = 0;
        wchunk->is_frag = 0;

        *_chunk = wchunk;

        DINFO("add chunk %u\n", chunk_id);
        return 0;
}

static void _print_chunk(lsv_wbuf_wal_chunk_t *wchunk, int idx, const char* name) {
        DINFO("lsn %llu %s %d chunk %u next_page %u\n", (LLU)wchunk->max_lsn, name, idx,
              wchunk->chunk_id, wchunk->next_page);
}

static void _print_chunk_list(lsv_wbuf_wal_t *wal) {
        struct list_head *pos, *n;
        lsv_wbuf_wal_chunk_t *wchunk;

        int i = 0;
        list_for_each_safe(pos, n, &wal->wait_list.list) {
                wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                _print_chunk(wchunk, i, "wait");
                i++;
        }

        i = 0;
        list_for_each_safe(pos, n, &wal->free_list.list) {
                wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                _print_chunk(wchunk, i, "free");
                i++;
        }

        DINFO("wait %u free %u\n", wal->wait_list.count, wal->free_list.count);
}

// -- malloc chunk management


static int _encode_chunk_list(lsv_volume_proto_t *lsv_info) {
        int ret;
        segment_head_t *seg_head;
        uint32_t chunk_num, buflen, buflen2;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;

        // TODO 4K alignment
        chunk_num = wal->wait_list.count + wal->free_list.count;
        buflen = sizeof(segment_head_t) + sizeof(uint32_t) * chunk_num;

        buflen2 = buflen;
        if (buflen % LSV_PAGE_SIZE != 0) {
                buflen2 += LSV_PAGE_SIZE - buflen % LSV_PAGE_SIZE;
        }

        YASSERT(buflen2 % LSV_PAGE_SIZE == 0);

        ret = ymalloc((void **)&seg_head, buflen2);
        if (unlikely(ret))
                YASSERT(0);

        memset(seg_head, 0, buflen2);
        seg_head->buflen = buflen2;
        seg_head->crc = 0;
        seg_head->magic = WAL_HEAD_MAGIC;
        seg_head->version = WAL_HEAD_VERSION;
        seg_head->segment_num = 0;
        seg_head->segment_len = 0;
        seg_head->chunk_num = chunk_num;

        {
                struct list_head *pos, *n;
                lsv_wbuf_wal_chunk_t *wchunk;

                // encode
                int chunk_idx = 0;

                // from wait -> free
                list_for_each_safe(pos, n, &wal->wait_list.list) {
                        wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                        seg_head->chunk_ids[chunk_idx++] = wchunk->chunk_id;
                }

                list_for_each_safe(pos, n, &wal->free_list.list) {
                        wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                        seg_head->chunk_ids[chunk_idx++] = wchunk->chunk_id;
                }

                YASSERT(chunk_idx == chunk_num);

                // dump to master record
                uint32_t off = (lsv_info->u.wbuf_page_id + 1)* LSV_PAGE_SIZE;
                lsv_volume_io_t vio;
                lsv_volume_io_init(&vio, LSV_PRIM_CHUNK_ID, off, buflen2, LSV_WBUFFER_STORAGE_TYPE);
                DINFO("dump chunk0 [off %u size %u]: segment %u chunk %u\n", off, buflen2,
                      seg_head->segment_num, chunk_num);
                ret = lsv_volume_chunk_update(lsv_info, &vio, (lsv_s8_t *)seg_head);
                if (unlikely(ret)){
                        DERROR("PERSISTENT WBUF'S CHUNK_ID INFO FAILED!!!\n");
                        GOTO(err_ret, ret);
                }
        }

        yfree((void **)&seg_head);
        return 0;
err_ret:
        yfree((void **)&seg_head);
        return ret;
}

#define IO_CHUNK_MAX 5

static int _decode_chunk_list(lsv_volume_proto_t *lsv_info, checkpoint_t *cp) {
        int ret;
        segment_head_t *seg_head;

        ret = read_chunk_list(lsv_info, &seg_head);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        int found = -1;
        for (int i=0; i < seg_head->chunk_num; i++) {
                if (cp->head_chunk == 0 || cp->head_chunk == seg_head->chunk_ids[i]) {
                        found = i;
                        break;
                }
        }

        YASSERT(found >= 0);

        // REDO chunk by chunk
        int idx;
        uint32_t chunk_id;

        int stop = 0;
        // TODO 开始水位线未必是0, IO跨chunk，提交点分割了一个IO
        uint32_t water_mark = 0;
        void *data[IO_CHUNK_MAX];
        uint64_t max_lsn = 0;

        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        wal_ioctx_t *ioctx;
        io_frag_t *frag;
        lsv_s8_t *new_buf;
        lsv_wbuf_wal_chunk_t *chunk;

        int chunk_iter = 0;
        while (chunk_iter < seg_head->chunk_num) {
                idx = (found + chunk_iter) % seg_head->chunk_num;
                chunk_id = seg_head->chunk_ids[idx];
                DINFO("chunk %d/%d %d %u\n", chunk_iter, seg_head->chunk_num, idx, chunk_id);
                if (stop) {
                        DINFO("add chunk %u to free list\n", chunk_id);
                        lsv_wal2_chunk_new(chunk_id, &chunk);
                        count_list_add_tail(&chunk->hook, &wbuf->wal->free_list);

                        chunk_iter++;
                } else {
                        // TODO 数组大小等于IO片段数
                        memset(data, 0, sizeof(void *) * IO_CHUNK_MAX);

                        if (data[0] == NULL) {
                                ret = _load_chunk(lsv_info, seg_head->chunk_ids[idx], &data[0]);
                                if (unlikely(ret))
                                        YASSERT(0);
                        }

                        DINFO("load %d chunk %u page %u\n", chunk_iter, chunk_id, water_mark);
                        ioctx = (wal_ioctx_t *)(data[0] + LSV_PAGE_SIZE * water_mark);

                        DINFO("lsn %llu frag %u off %llu size %u\n", (LLU)ioctx->lsn, ioctx->frag_num,
                              (LLU)ioctx->lba, ioctx->size);

                        if (ioctx->magic != WAL_HEAD_MAGIC) {
                                DINFO("stop magic %u\n", ioctx->magic);
                                stop = 1;

                                DINFO("add chunk %u page %u to free list\n", chunk_id, water_mark);
                                lsv_wal2_chunk_new(chunk_id, &chunk);
                                chunk->next_page = water_mark;
                                chunk->max_lsn = max_lsn;
                                count_list_add_tail(&chunk->hook, &wbuf->wal->free_list);

                                chunk_iter++;
                                continue;
                        }

                        if (max_lsn == 0) {
                                max_lsn = ioctx->lsn;
                        }

                        if (ioctx->lsn < cp->last_lsn || ioctx->lsn < max_lsn) {
                                // TODO
                                DINFO("stop commit_lsn %llu max_lsn %llu lsn %llu\n",
                                      (LLU)cp->last_lsn, (LLU)max_lsn, (LLU)ioctx->lsn);
                                stop = 1;

                                DINFO("add chunk %u page %u to free list\n", chunk_id, water_mark);
                                lsv_wal2_chunk_new(chunk_id, &chunk);
                                chunk->next_page = water_mark;
                                chunk->max_lsn = max_lsn;
                                count_list_add_tail(&chunk->hook, &wbuf->wal->free_list);

                                chunk_iter++;
                                continue;
                        }

                        max_lsn = ioctx->lsn;

                        // get io
                        {
                                io_t io;
                                io_init(&io, NULL, NULL, ioctx->lba, ioctx->size, 0);
                                io.lsn = ioctx->lsn;

                                buffer_t mbuf;
                                mbuffer_init(&mbuf, 0);

                                for (int j = 0; j < ioctx->frag_num; j++) {
                                        frag = &ioctx->frags[j];

                                        DINFO("lsn %llu chunk %u page %u %u off %llu %u\n", (LLU)ioctx->lsn,
                                              frag->chunk_id,
                                              frag->page_idx, frag->page_num,
                                              (LLU)frag->off, frag->size);

                                        // load chunk data
                                        if (data[j] == NULL) {
                                                ret = _load_chunk(lsv_info, seg_head->chunk_ids[idx], &data[j]);
                                                if (unlikely(ret))
                                                        YASSERT(0);
                                        }
                                        new_buf = data[j] + LSV_PAGE_SIZE * frag->page_idx;

                                        if (j == 0) {
                                                YASSERT(frag->chunk_id == seg_head->chunk_ids[idx]);
                                                YASSERT(frag->page_idx == water_mark);

                                                int hl = get_header_len();
                                                mbuffer_appendmem(&mbuf, new_buf + hl, frag->size - hl);
                                        } else {
                                                YASSERT(frag->page_idx == 0);

                                                mbuffer_appendmem(&mbuf, new_buf, frag->size);
                                        }

                                        water_mark = frag->page_idx + frag->page_num;
                                        YASSERT(water_mark <= LSV_WBUF_PAGE_NUM);
                                        if (water_mark == LSV_WBUF_PAGE_NUM) {
                                                DINFO("add chunk %u page %u to wait list\n", chunk_id, water_mark);
                                                lsv_wal2_chunk_new(frag->chunk_id, &chunk);
                                                chunk->next_page = water_mark;
                                                chunk->max_lsn = max_lsn;
                                                count_list_add_tail(&chunk->hook, &wbuf->wal->wait_list);

                                                chunk_iter++;
                                        }
                                }

                                // TODO
                                YASSERT(mbuf.len == ioctx->size);
                                write_data_buffer(lsv_info, &io, &mbuf);

                                mbuffer_free(&mbuf);

                                for (int j=0; j < IO_CHUNK_MAX; j++) {
                                        if (data[j] != NULL) {
                                                yfree(&data[j]);
                                        }
                                }

                                DINFO("update wbuf lsn %llu %llu\n", (LLU)wbuf->last_lsn, (LLU)io.lsn);
                                if (wbuf->last_lsn < io.lsn) {
                                        wbuf->last_lsn = io.lsn;
                                }
                                idx = (found + chunk_iter) % seg_head->chunk_num;
                                chunk_id = seg_head->chunk_ids[idx];
                                DINFO("REDO lsn %llu chunk %u next %u %u %u off %llu %u\n", (LLU)io.lsn,
                                                chunk_id, chunk_iter, chunk_id, water_mark, io.lsn, io.size);
                        }
                }
        }

        DINFO("wbuf lsn %llu\n", (LLU)wbuf->last_lsn);

        yfree((void **)&seg_head);
        return 0;
err_free:
        yfree((void **)&seg_head);
err_ret:
        return ret;
}

int _get_next_chunk(lsv_volume_proto_t *lsv_info, lsv_wbuf_wal_chunk_t **wchunk) {
        int ret;
        lsv_wbuf_wal_chunk_t *_wchunk;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        struct list_head *pos;
        static struct list_head *__cursor__ = NULL;

        *wchunk = NULL;

        while (1) {
                // TODO double check
                if (list_empty_careful(&wal->free_list.list)) {
                        lsv_wrlock(&wal->malloc_rwlock);
                        if (list_empty_careful(&wal->free_list.list)) {
                                ret = lsv_wal2_malloc(lsv_info, MALLOC_NUM);
                                if (unlikely(ret))
                                        YASSERT(0);
                        }
                        lsv_unlock(&wal->malloc_rwlock);
                }

                if (lsv_feature & LSV_FEATURE_SPIRAL_WRITE) {
                        if (__cursor__ == NULL) {
                                __cursor__ = wal->free_list.list.next;
                                pos = __cursor__;
                        } else {
                                __cursor__ = __cursor__->next;
                                if (__cursor__ == &wal->free_list.list)
                                        __cursor__ = __cursor__->next;
                                pos = __cursor__;
                        }
                } else {
                        pos = wal->free_list.list.next;
                }
                _wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                _print_chunk(_wchunk, 0, __FUNCTION__);

                // TODO if only one page left?
                if (_chunk_is_full(_wchunk)) {
                        DINFO("chunk %d is full\n", _wchunk->chunk_id);

                        count_list_del_init(pos, &wal->free_list);
                        count_list_add_tail(pos, &wal->wait_list);
                } else {
                        break;
                }
        }

        *wchunk = _wchunk;

        DINFO("wait %u free %u\n", wal->wait_list.count, wal->free_list.count);
        return 0;
}

int lsv_wal2_init(lsv_volume_proto_t *lsv_info) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = NULL;

        ret = ymalloc((void **)&wal, sizeof(lsv_wbuf_wal_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        count_list_init(&wal->free_list);
        count_list_init(&wal->wait_list);

        co_cond_init(&wal->full_cond);

        lsv_rwlock_init(&wal->malloc_rwlock);

        // wal->need_checkpoint = FALSE;

        wbuf->wal = wal;

        return 0;
err_ret:
        return ret;
}

int lsv_wal2_destroy(lsv_volume_proto_t *lsv_info) {
        return 0;
}

int lsv_wal2_malloc(lsv_volume_proto_t *lsv_info, uint32_t count) {
        int ret;
        lsv_wbuf_wal_chunk_t *wchunk;
        uint32_t *new_chunk_ids;

        ret = ymalloc((void **)&new_chunk_ids, count * sizeof(uint32_t));
        if (unlikely(ret))
                YASSERT(0);

        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        ret = lsv_volume_chunk_malloc_batch(lsv_info, LSV_WBUFFER_STORAGE_TYPE, count, new_chunk_ids);
        if(unlikely(ret))
                GOTO(err_ret, ret);

        for (int i=0; i < count; ++i) {
                lsv_wal2_chunk_new(new_chunk_ids[i], &wchunk);
                count_list_add_tail(&wchunk->hook, &wbuf->wal->free_list);
                DINFO("i %d chunk_id %u\n", i, new_chunk_ids[i]);
        }

        yfree((void **)&new_chunk_ids);

        _print_chunk_list(wbuf->wal);
        _encode_chunk_list(lsv_info);
        return 0;
err_ret:
        yfree((void **)&new_chunk_ids);
        return ret;
}

int lsv_wal2_append_prepare(lsv_volume_proto_t *lsv_info, const io_t *io, wal_ioctx_t *ctx) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        lsv_wbuf_wal_chunk_t *wchunk;
        uint64_t off;
        int left;
        uint32_t step, free_size;
        io_frag_t *frag;

        ctx->magic = WAL_HEAD_MAGIC;
        ctx->version = WAL_HEAD_VERSION;
        ctx->head_len = get_header_len();
        ctx->crc = 0;

        ctx->lsn = io->lsn;

        ctx->lba = io->offset;
        ctx->size = io->size;
        ctx->frag_num = 0;

        off = io->offset;

        // add header, tail padding
        left = ctx->head_len + io->size;

        // check segment has enough free space,
        // if not, switch to next segment

        DINFO("lsn %llu off %llu left %u = (%u + %u)\n", (LLU)ctx->lsn, (LLU)off, left, ctx->head_len, io->size);
        while(left > 0) {
                // lock free, wait free, no yield, no page alignment
                ret = _get_next_chunk(lsv_info, &wchunk);
                if (unlikely(ret))
                        YASSERT(0);

                free_size = (LSV_WBUF_PAGE_NUM - wchunk->next_page) * LSV_PAGE_SIZE;
                YASSERT(free_size >= LSV_PAGE_SIZE);

                step = _min(left, free_size);

                // new io frag
                frag = &ctx->frags[ctx->frag_num];

                frag->off = off;
                frag->size = step;
                // TODO first frag + header

                // where
                frag->chunk_id = wchunk->chunk_id;
                frag->page_idx = wchunk->next_page;
                frag->page_num = _page_num(step);

                DINFO("left %u free_size %u step %u\n", left, free_size, step);
                DINFO("lsn %llu frag %u chunk %u page %u %u off %llu size %u\n",
                      (LLU)ctx->lsn,
                      ctx->frag_num,
                      frag->chunk_id,
                      frag->page_idx, frag->page_num,
                      (LLU)frag->off, frag->size);

                // when LSV_WBUF_PAGE_NUM, chunk is full
                wchunk->max_lsn = io->lsn;
                if (wchunk->next_page == 0 && ctx->frag_num > 0) {
                        wchunk->is_frag = 1;
                }
                wchunk->next_page += _page_num(step);
                YASSERT(wchunk->next_page <= LSV_WBUF_PAGE_NUM);

                ctx->frag_num++;

                // next IO frag
                left -= step;
                off += step;
        }

        YASSERT(left == 0);
        return 0;
}

int lsv_wal2_commit(lsv_volume_proto_t *lsv_info, uint64_t lsn) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        lsv_wbuf_wal_chunk_t *wchunk;
        BOOL need_checkpoint = FALSE;
        uint64_t commit_lsn = 0;

        struct list_head *pos, *n;

        DINFO("lsn %llu\n", (LLU)lsn);
        list_for_each_safe(pos, n, &wal->wait_list.list) {
                wchunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);
                YASSERT(_chunk_is_full(wchunk));
                if (wchunk->max_lsn < lsn) {
                        DINFO("chunk %u lsn %llu < %llu %d\n", wchunk->chunk_id, (LLU)wchunk->max_lsn, (LLU)lsn,
                              wchunk->max_lsn < lsn ? 1 : 0);

                        // committed, can gc
                        wchunk->next_page = 0;

                        count_list_del_init(pos, &wal->wait_list);
                        count_list_add_tail(pos, &wal->free_list);

                        commit_lsn = wchunk->max_lsn;
                        need_checkpoint = TRUE;
                } else {
                        // TODO
                        break;
                }
        }

        // TODO checkpoint
        if (need_checkpoint) {
                lsv_wbuf_wal_chunk_t *chunk;

                if (list_empty_careful(&wal->wait_list.list)) {
                        YASSERT(!list_empty_careful(&wal->free_list.list));
                        pos = wal->free_list.list.next;
                } else {
                        pos = wal->wait_list.list.next;
                }

                chunk = list_entry(pos, lsv_wbuf_wal_chunk_t, hook);

                // TODO 提交点可能分割了一个IO，导致无法定位到IO头.
                // 不同于分段管理的方式，每段第一个chunk水位线一定为0，chunk则不必如此。
                // NOTE: !!!提交点不能分割IO。
                _encode_checkpoint(lsv_info, chunk->chunk_id, commit_lsn);
        }

        DINFO("wait %u free %u\n", wal->wait_list.count, wal->free_list.count);
        return 0;
}

int lsv_wal2_checkpoint(lsv_volume_proto_t *lsv_info) {
        //
        return 0;
}

int lsv_wal2_load(lsv_volume_proto_t *lsv_info) {
        DINFO("load\n");

        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        checkpoint_t cp;

        YASSERT(wal != NULL);

        _decode_checkpoint(lsv_info, &cp);

        wbuf->last_lsn = cp.last_lsn;

        _decode_chunk_list(lsv_info, &cp);

        return 0;
}

#if 0

/** @deprecated
 *
 */
typedef struct {
        lsv_volume_proto_t *lsv_info;
        void *buf;

        int ret;

        //
        int done;
        co_cond_t cond;
} append_io_ctx_t;

static int __do_wal2_append(void *arg) {
        int ret;
        append_io_ctx_t *ctx = arg;
        lsv_volume_proto_t *lsv_info = ctx->lsv_info;

        ret = lsv_wal2_append(lsv_info, ctx->buf);
        if (unlikely(ret)) {
                DERROR("ret %d\n", ret);
        }

        ctx->done = TRUE;
        ctx->ret = ret;
#if ENABLE_WBUF_MQ
        co_cond_broadcast(&ctx->cond, ret);
#endif

        DINFO("exit ret %d\n", ret);
        return 0;
}

int lsv_wal2_append_async(lsv_volume_proto_t *lsv_info, char *buffer) {
        int ret;
        lsv_wbuf_t *wbuf = (lsv_wbuf_t *)lsv_info->wbuf;

        YASSERT(buffer != NULL);

        append_io_ctx_t ctx;
        ctx.lsv_info = lsv_info;
        ctx.buf = buffer;
        ctx.done = FALSE;
        ctx.ret = 0;

#if ENABLE_WBUF_MQ
        co_cond_init(&ctx.cond);
        ret = co_mq_offer(&wbuf->write_mq, "wal_append", __do_wal2_append, &ctx, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        while(!ctx.done) {
                co_cond_wait(&ctx.cond);
        }

        ret = ctx.ret;
        if (unlikely(ret))
                GOTO(err_ret, ret);
#else
        // schedule_task_new("wal_append", __do_wal2_append, &ctx, -1);

        ret = __do_wal2_append(&ctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
        err_ret:
        return ret;
}

#endif
